/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.test.util;

import org.apache.flink.test.parameters.ParameterProperty;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/** Collection of file-related utilities. */
public class FileUtils {

    private static final Logger LOG = LoggerFactory.getLogger(FileUtils.class);

    private static final ParameterProperty<Path> PROJECT_ROOT_DIRECTORY =
            new ParameterProperty<>("rootDir", Paths::get);
    private static final ParameterProperty<Path> DISTRIBUTION_DIRECTORY =
            new ParameterProperty<>("distDir", Paths::get);

    /**
     * Matches the given {@link Pattern} against all lines in the given file, and replaces all
     * matches with the replacement generated by the given {@link Function}. All unmatched lines and
     * provided replacements are written into the file, with the order corresponding to the original
     * content. Newlines are automatically added to each line; this implies that an empty
     * replacement string will result in an empty line to be written.
     */
    public static void replace(Path file, Pattern pattern, Function<Matcher, String> replacer)
            throws IOException {
        final List<String> fileLines = Files.readAllLines(file);
        try (PrintWriter pw =
                new PrintWriter(
                        new OutputStreamWriter(
                                Files.newOutputStream(file, StandardOpenOption.TRUNCATE_EXISTING),
                                StandardCharsets.UTF_8.name()))) {
            for (String fileLine : fileLines) {
                Matcher matcher = pattern.matcher(fileLine);
                if (matcher.matches()) {
                    String replacement = replacer.apply(matcher);
                    pw.println(replacement);
                } else {
                    pw.println(fileLine);
                }
            }
        }
    }

    public static Path findFlinkDist() {
        Optional<Path> distributionDirectory = DISTRIBUTION_DIRECTORY.get();
        if (!distributionDirectory.isPresent()) {
            LOG.debug(
                    "The '{}' property was not set; attempting to automatically determine distribution location.",
                    DISTRIBUTION_DIRECTORY.getPropertyName());

            Path projectRootPath;
            Optional<Path> projectRoot = PROJECT_ROOT_DIRECTORY.get();
            if (projectRoot.isPresent()) {
                // running with maven
                projectRootPath = projectRoot.get();
            } else {
                // running in the IDE; working directory is test module
                Optional<Path> projectRootDirectory =
                        findProjectRootDirectory(Paths.get("").toAbsolutePath());
                // this distinction is required in case this class is used outside of Flink
                if (projectRootDirectory.isPresent()) {
                    projectRootPath = projectRootDirectory.get();
                } else {
                    throw new IllegalArgumentException(
                            "The 'distDir' property was not set and the flink-dist module could not be found automatically."
                                    + " Please point the 'distDir' property to the directory containing distribution; you can set it when running maven via -DdistDir=<path> .");
                }
            }
            Optional<Path> distribution = findDistribution(projectRootPath);
            if (!distribution.isPresent()) {
                throw new IllegalArgumentException(
                        "The 'distDir' property was not set and a distribution could not be found automatically."
                                + " Please point the 'distDir' property to the directory containing distribution; you can set it when running maven via -DdistDir=<path> .");
            } else {
                distributionDirectory = distribution;
            }
        }
        return distributionDirectory.get();
    }

    private static Optional<Path> findProjectRootDirectory(Path currentDirectory) {
        // move up the module structure until we find flink-dist; relies on all modules being
        // prefixed with 'flink'
        do {
            if (Files.exists(currentDirectory.resolve("flink-dist"))) {
                return Optional.of(currentDirectory);
            }
            currentDirectory = currentDirectory.getParent();
        } while (currentDirectory.getFileName().toString().startsWith("flink"));
        return Optional.empty();
    }

    private static Optional<Path> findDistribution(Path projectRootDirectory) {
        final Path distTargetDirectory =
                projectRootDirectory.resolve("flink-dist").resolve("target");
        try {
            Collection<Path> paths =
                    org.apache.flink.util.FileUtils.listFilesInDirectory(
                            distTargetDirectory, FileUtils::isDistribution);
            if (paths.size() == 0) {
                // likely due to flink-dist not having been built
                return Optional.empty();
            }
            if (paths.size() > 1) {
                // target directory can contain distributions for multiple versions, or it's just a
                // dirty environment
                LOG.warn(
                        "Detected multiple distributions under flink-dist/target. It is recommended to explicitly"
                                + " select the distribution by setting the '{}}' property.",
                        DISTRIBUTION_DIRECTORY.getPropertyName());
            }
            // jar should be in /lib; first getParent() returns /lib, second getParent() returns
            // distribution directory
            return Optional.of(paths.iterator().next().getParent().getParent());
        } catch (IOException e) {
            LOG.error("Error while searching for distribution.", e);
            return Optional.empty();
        }
    }

    private static boolean isDistribution(Path path) {
        // check for `lib/flink-dist*'
        // searching for the flink-dist jar is not sufficient since it also exists in the modules
        // 'target' directory
        return path.getFileName().toString().contains("flink-dist")
                && path.getParent().getFileName().toString().equals("lib");
    }
}
